#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>


namespace rpc
{
    namespace server
    {
        class TopicManager
        {
        public:
            using ptr = std::shared_ptr<TopicManager>;
            void onTopicRequest(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg)
            {
                TopicOptype topic_optype = msg->optype();
                bool ret = true;
                switch(topic_optype)
                {
                    //主题的创建
                    case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;
                    //主题的删除
                    case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;
                    //主题的订阅
                    case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;
                    //主题的取消订阅
                    case TopicOptype::TOPIC_CANCLE: topicCancel(conn, msg); break;
                    //主题消息的发布
                    case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;
                    default:  return errorResponse(conn, msg, Rcode::RCODE_INVALID_OPTYPE);
                }
                if (!ret) return errorResponse(conn, msg, Rcode::RCODE_NOT_FOUND_TOPIC);
                return topicResponse(conn, msg);
            }

            //一个订阅者在连接断开时的处理---删除其关联的数据
            void onShutdown(const BaseConnection::ptr &conn) 
            {
                //消息发布者断开连接，不需要任何操作；  消息订阅者断开连接需要删除管理数据
                // 发布者只是消息的源头，不维护与其他组件的状态依赖。断开连接仅意味着：
                // 不再有新消息产生
                // 不会影响已发送的消息或订阅者的状态
                //1. 判断断开连接的是否是订阅者，不是的话则直接返回
                std::vector<Topic::ptr> topics;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _subscribers.find(conn);
                    if(it == _subscribers.end())
                        return;
                    subscriber = it->second;
                    //2. 获取到订阅者退出，受影响的主题对象
                    for(auto &topic_name : subscriber->topics)
                    {
                        auto topic_it = _topics.find(topic_name);
                        if(topic_it == _topics.end()) continue;
                        topics.emplace_back(topic_it->second);
                    }
                     //4. 从订阅者映射信息中，删除订阅者
                     _subscribers.erase(it);
                }
                //5. 从受影响的主题对象中，移除订阅者
                for (auto &topic : topics) 
                {
                    topic->removeSubscriber(subscriber);
                }
            }
        private:
            void errorResponse(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg,Rcode rcode)
            {
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setmtype(Mtype::RSP_TOPIC);
                msg_rsp->setid(msg->id());
                msg_rsp->setRcode(rcode);
                conn->send(msg_rsp);
            }
            void topicResponse(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg)
            {
                auto msg_rsp = MessageFactory::create<TopicResponse>();
                msg_rsp->setmtype(Mtype::RSP_TOPIC);
                msg_rsp->setid(msg->id());
                msg_rsp->setRcode(Rcode::RCODE_OK);
                conn->send(msg_rsp);
            }
            void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) 
            {
                std::unique_lock<std::mutex> lock(_mutex);
                std::string topic_name = msg->topicKey();
                auto topic = std::make_shared<Topic>(topic_name);
                _topics.insert(std::make_pair(topic_name,topic));
            }
            void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) 
            {
                // 1. 查看当前主题，有哪些订阅者，然后从订阅者中将主题信息删除掉
                // 2. 删除主题的数据 -- 主题名称与主题对象的映射关系
                std::string topic_name = msg->topicKey();
                std::unordered_set<Subscriber::ptr> subscribers;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    //在删除主题之前，先找出会受到影响的订阅者
                    auto it = _topics.find(topic_name);
                    if(it == _topics.end())
                        return;
                    subscribers = it->second->subscribers;
                    _topics.erase(it);
                }
                for(auto &subscriber : subscribers)
                {
                    subscriber->removeTopic(topic_name);
                }
            }
            bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) 
            {
                //1. 先找出主题对象，以及订阅者对象
                //   如果没有找到主题--就要报错；  但是如果没有找到订阅者对象，那就要构造一个订阅者
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(msg->topicKey());
                    if(topic_it == _topics.end())
                        return false;
                    topic = topic_it->second;
                    auto subscriber_it = _subscribers.find(conn);
                    if(subscriber_it != _subscribers.end())
                    {
                        subscriber = subscriber_it->second;
                    }
                    else
                    {
                        subscriber = std::make_shared<Subscriber>(conn);
                        _subscribers.insert(std::make_pair(conn,subscriber));
                    }
                }
                //2. 在主题对象中，新增一个订阅者对象关联的连接；  在订阅者对象中新增一个订阅的主题
                topic->appendSubscriber(subscriber);
                subscriber->appendTopic(msg->topicKey());
                return true;
            }
            void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                //找出要取消的主题对象
                //找出想要取消订阅主题的订阅者
                //删除订阅者和取消订阅主题之间的映射关系
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(msg->topicKey());
                    if(topic_it == _topics.end())
                        return;
                    topic = topic_it->second;
                    auto sub_it = _subscribers.find(conn);
                    if(sub_it == _subscribers.end())
                        return;
                    subscriber = sub_it->second;
                }
                subscriber->removeTopic(msg->topicKey());
                topic->removeSubscriber(subscriber);
            }
            bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) 
            {
                //找到需要发布的主题
                Topic::ptr topic;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(msg->topicKey());
                    if(topic_it == _topics.end())
                        return false;
                    topic = topic_it->second;
                }
                topic->putMessage(msg);
                return true;
            }
        private:
            struct Subscriber 
            {
                using ptr = std::shared_ptr<Subscriber>;
                std::mutex mutex;
                BaseConnection::ptr conn;
                std::unordered_set<std::string> topics;//订阅者订阅主题名称
                Subscriber(const BaseConnection::ptr& c)  : conn(c) {}
                //订阅主题的时候调用
                void appendTopic(const std::string& topic_name)
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    topics.emplace(topic_name);
                }
                //主题被删除 或者 取消订阅的时候，调用
                void removeTopic(const std::string& topic_name)
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    topics.erase(topic_name);
                }
            };
            struct Topic
            {
                using ptr = std::shared_ptr<Topic>;
                std::mutex mutex;
                std::string topic_name;
                std::unordered_set<Subscriber::ptr> subscribers;//订阅当前主题的订阅者
                Topic(const std::string &name) : topic_name(name){}
                void appendSubscriber(const Subscriber::ptr &subscriber) 
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    subscribers.emplace(subscriber);
                }
                void removeSubscriber(const Subscriber::ptr &subscriber) 
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    subscribers.erase(subscriber);
                }
                //收到消息发布时候调用
                void putMessage(const BaseMessage::ptr& msg)
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    for(auto &subcriber : subscribers)
                    {
                        subcriber->conn->send(msg);
                    }
                }
            };
        private:
            std::mutex _mutex;
            std::unordered_map<std::string,Topic::ptr> _topics;//主题名称和主题类之间的映射
            std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
        };


    }
}